%%--- coding:utf-8 ---
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% File Name: mongo_protocol
%%% Created on : 2024/5/2 11:49
%%% @author Gaylen 252323463@qq.com
%%% @copyright (C) 2024, freedom
%%% @doc
%%%
%%% @end
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

-module(mongo_protocol).
-author("Gaylen").
-include("mongodb_driver.hrl").
-include_lib("bson/include/bson_binary.hrl").

%% API0
-export([
%%    put_message/3,
%%    get_reply/1,
    encode_ping/2,
    encode_build_info/2,
    encode_is_master/2,
    encode_create_indexes/7,
    encode_insert/4,
    encode_update/4,
    encode_delete/4,
    encode_find_and_modify/7,
    encode_find_one/5,
    encode_find_limit100/5,
    encode_find_list/9,
    encode_get_more/5,
    encode_kill_cursors/4,
    encode_command/3,
    encode_opquery/1,
    decode_reply/1,

    priv_encode_opupdate/1,
    priv_encode_opinsert/1,
    priv_encode_opgetmore/1,
    priv_encode_opdelete/1,
    priv_encode_opkillcursors/1
]).

% RequestId expected to be in scope at call site
-define(put_header(RequestId, Opcode), ?put_int32(RequestId), ?put_int32(0), ?put_int32(Opcode)).
-define(get_header(RequestId, Opcode, ResponseTo), ?get_int32(RequestId), ?get_int32(ResponseTo), ?get_int32(Opcode)).

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% 重新整理协议
encode_ping(RequestId, Db)->
    OpMsgRec = #mongo_opmsg{
        request_id = RequestId,
        payload_type = 0,
        document = {<<"ping">>,1, <<"$db">>,Db}
    },
    priv_encode_opmsg(OpMsgRec).

encode_build_info(RequestId, Db)->
    OpMsgRec = #mongo_opmsg{
        request_id = RequestId,
        payload_type = 0,
        document = {<<"buildInfo">>,1, <<"$db">>,Db}
    },
    priv_encode_opmsg(OpMsgRec).

encode_is_master(RequestId, Db) ->
    OpMsgRec = #mongo_opmsg{
        request_id = RequestId,
        payload_type = 0,
        document = {<<"isMaster">>,1, <<"$db">>,Db}
    },
    priv_encode_opmsg(OpMsgRec).

encode_create_indexes(RequestId, Db, Collection, IndexName, IndexSpecs, Unique, DropDups)->
    EnsureIndexDoc = {
        <<"createIndexes">>, Collection,
        <<"indexes">>, [{<<"name">>,IndexName, <<"key">>,IndexSpecs, <<"unique">>,Unique, <<"dropDups">>,DropDups}],
        <<"$db">>, Db
    },
    OpMsgRec = #mongo_opmsg{
        request_id = RequestId,
        payload_type = 0,
        document = EnsureIndexDoc
    },
    priv_encode_opmsg(OpMsgRec).

encode_insert(RequestId, Db, Collection, Docs)->
    DocsBin = priv_encode_docs(Docs),
    DocumentsFieldBin = bson_binary:put_cstring(<<"documents">>),
    DocsBinWithDocuments = <<DocumentsFieldBin/binary, DocsBin/binary>>,
    DocsBinWithDocumentsLen = byte_size(DocsBinWithDocuments) + 4,
    OpBin = bson_binary:put_document({
        <<"insert">>, Collection,
        <<"ordered">>, true,
%%        <<"lsid">>, {<<"id">>, uuid:get_v4()},    %% TODO 批量插入时定义事件id字段: lsid
        <<"$readPreference">>, {<<"mode">>, <<"primaryPreferred">>},
        <<"$db">>, Db
    }),
    OpMsg = <<
%%  messageLength                 %% messageLength set later
        RequestId:32/signed-little,       %% request Id
        0:32/signed-little,           %% response To
        ?MONGO_OP_MSG:32/signed-little, %% opCode
        0:32/unsigned-little,         %% flagBits set to 0
        1:8/unsigned-little, DocsBinWithDocumentsLen:32/unsigned-little, DocsBinWithDocuments/binary,
        0:8, OpBin/binary %% section
    >>,
    MsgLength = (byte_size(OpMsg)+4),
    <<MsgLength:32/signed-little, OpMsg/binary>>.

%% UpdateMapList: [#{q=>SelectorMap,u=>UpdateMap,multi=>false,upsert=>true}]
encode_update(RequestId, Db, Collection, Docs)->
    DocsBin = priv_encode_docs(Docs),
    DocumentsFieldBin = bson_binary:put_cstring(<<"updates">>),
    DocsBinWithDocuments = <<DocumentsFieldBin/binary, DocsBin/binary>>,
    DocsBinWithDocumentsLen = byte_size(DocsBinWithDocuments) + 4,
    OpBin = bson_binary:put_document({
        <<"update">>, Collection,
        <<"ordered">>, true,
%%        <<"lsid">>, {<<"id">>, uuid:get_v4()},    %% TODO 批量插入时定义事件id字段: lsid
        <<"$readPreference">>, {<<"mode">>, <<"primaryPreferred">>},
        <<"$db">>, Db
    }),
    OpMsg = <<
%%  messageLength                 %% messageLength set later
        RequestId:32/signed-little,       %% request Id
        0:32/signed-little,           %% response To
        ?MONGO_OP_MSG:32/signed-little, %% opCode
        0:32/unsigned-little,         %% flagBits set to 0
        1:8/unsigned-little, DocsBinWithDocumentsLen:32/unsigned-little, DocsBinWithDocuments/binary,
        0:8, OpBin/binary %% section
    >>,
    MsgLength = (byte_size(OpMsg)+4),
    <<MsgLength:32/signed-little, OpMsg/binary>>.

encode_delete(RequestId, Db, Collection, Docs)->
    DocsBin = priv_encode_docs(Docs),
    DocumentsFieldBin = bson_binary:put_cstring(<<"deletes">>),
    DocsBinWithDocuments = <<DocumentsFieldBin/binary, DocsBin/binary>>,
    DocsBinWithDocumentsLen = byte_size(DocsBinWithDocuments) + 4,
    OpBin = bson_binary:put_document({
        <<"delete">>, Collection,
        <<"$db">>, Db
    }),
    OpMsg = <<
%%  messageLength                 %% messageLength set later
        RequestId:32/signed-little,       %% request Id
        0:32/signed-little,           %% response To
        ?MONGO_OP_MSG:32/signed-little, %% opCode
        0:32/unsigned-little,         %% flagBits set to 0
        1:8/unsigned-little, DocsBinWithDocumentsLen:32/unsigned-little, DocsBinWithDocuments/binary,
        0:8, OpBin/binary %% section
    >>,
    MsgLength = (byte_size(OpMsg)+4),
    <<MsgLength:32/signed-little, OpMsg/binary>>.

encode_find_and_modify(RequestId, Db, Collection, Selector, UpdateDoc, Projector, IsUpInsert)->
    FindAndModifyDoc = {
        <<"findAndModify">>, Collection,
        <<"query">>, Selector,
        <<"update">>, UpdateDoc,
        <<"fields">>, Projector,
        <<"new">>, true,
        <<"upsert">>, IsUpInsert,
        <<"$db">>, Db
    },
    OpMsgRec = #mongo_opmsg{
        request_id = RequestId,
        payload_type = 0,
        document = FindAndModifyDoc
    },
    priv_encode_opmsg(OpMsgRec).

encode_find_one(RequestId, Db, Collection, Selector, Projector)->
    FindDoc = {
        <<"find">>, Collection,
        <<"filter">>, Selector,
        <<"projection">>, Projector,
        <<"limit">>, 1,
        <<"singleBatch">>, true,
        <<"$db">>, Db
    },
    OpMsgRec = #mongo_opmsg{
        request_id = RequestId,
        payload_type = 0,
        document = FindDoc
    },
    priv_encode_opmsg(OpMsgRec).

encode_find_limit100(RequestId, Db, Collection, QueryMap, Projector)->
    OpMsgRec = #mongo_opmsg{
        request_id = RequestId,
        payload_type = 0,
        document = {find,Collection,'$db',Db, filter,QueryMap, projection,Projector, singleBatch,true}
    },
    priv_encode_opmsg(OpMsgRec).

encode_find_list(RequestId, Db, Collection, Selector, SortDoc, Projector, Skip, Limit, IsSlave)->
    FindDoc =
        if
            IsSlave ->
                {
                    <<"find">>, Collection,
                    <<"filter">>, Selector,
                    <<"skip">>, Skip,
                    <<"limit">>, Limit,
                    <<"singleBatch">>, false,
                    <<"sort">>, SortDoc,
                    <<"projection">>, Projector,
                    <<"$readPreference">>, {<<"mode">>, <<"secondaryPreferred">>},
                    <<"$db">>, Db
                };
            true ->
                {
                    <<"find">>, Collection,
                    <<"filter">>, Selector,
                    <<"skip">>, Skip,
                    <<"limit">>, Limit,
                    <<"singleBatch">>, false,
                    <<"sort">>, SortDoc,
                    <<"projection">>, Projector,
                    <<"$readPreference">>, {<<"mode">>, <<"primaryPreferred">>},
                    <<"$db">>, Db
                }
        end,
    OpMsgRec = #mongo_opmsg{
        request_id = RequestId,
        payload_type = 0,
        document = FindDoc
    },
    priv_encode_opmsg(OpMsgRec).

encode_get_more(RequestId, Db, Collection, CursorId, BatchSize)->
    GetMoreDoc = {
        <<"getMore">>,CursorId,
        <<"collection">>,Collection,
        <<"batchSize">>,BatchSize,
        <<"$db">>,Db
    },
    OpMsgRec = #mongo_opmsg{
        request_id = RequestId,
        payload_type = 0,
        document = GetMoreDoc
    },
    priv_encode_opmsg(OpMsgRec).

encode_kill_cursors(RequestId, Db, Collection, CursorId)->
    OpMsgRec = #mongo_opmsg{
        request_id = RequestId,
        payload_type = 0,
        document = {<<"killCursors">>,Collection, '$db',Db, cursors,[CursorId]}
    },
    priv_encode_opmsg(OpMsgRec).

encode_command(RequestId, Db, Command) ->
    CommandWithDb = bson:update('$db', Db, Command),
%%    io:format("~p", [CommandWithDb]),
    OpMsgRec = #mongo_opmsg{
        request_id = RequestId,
        payload_type = 0,
        document = CommandWithDb
    },
    priv_encode_opmsg(OpMsgRec).

decode_reply(ReplyBin) ->
    <<
        RequestId:32/signed-little,      %% request Id
        ResponseTo:32/signed-little,  %% response To 响应的是哪个请求id
        OpCode:32/signed-little, %% opCode
        RestReplyBin/binary
    >> = ReplyBin,
    if
        OpCode =:= ?MONGO_OP_MSG ->
            <<
                ?get_bits32(_B1, _B2, _B3, _B4, _B5, _B6, _MoreToCome, ChecksumPresent),
                _Kind:8,
                SectionsAndCrc32Bin/binary
            >> = RestReplyBin,
            if
                ChecksumPresent =:= 1 ->
                    %% TODO 检查CRC32
                    ok;
                true ->
                    ok
            end,
            {RespMap, NewRestReplyBin} = bson_binary:get_map(SectionsAndCrc32Bin),
            OpMsgRec = #mongo_opmsg{
                request_id = ResponseTo,
                payload_type = 0,
                document = RespMap
            },
            {OpMsgRec, NewRestReplyBin};
        true ->
            <<
                ?get_bits32(_B1, _B2, _B3, _B4, FlagsAwaitCapable, FlagsShardConfigStale, FlagsQueryFailure, FlagsCursorNotFound),
                CursorId:64/signed-little,
                StartingFrom:32/signed-little,
                NumberReturned:32/signed-little,
                RespDocsBin/binary %% section
            >> = RestReplyBin,
            {RespDocs, RestReplyBin} = priv_decode_docs(NumberReturned, RespDocsBin, []),
            OpReplyRec = #mongo_opreply{
                request_id = RequestId,
                response_id = ResponseTo,
                res_flag_cursor_not_found = bool(FlagsCursorNotFound),
                res_flag_query_failure = bool(FlagsQueryFailure),
                res_flag_shard_config_stale = bool(FlagsShardConfigStale),
                res_flag_await_capable = bool(FlagsAwaitCapable),
                cursor_id = CursorId,
                starting_from = StartingFrom,
                number_returned = NumberReturned,
                documents = RespDocs
            },
            {OpReplyRec, RestReplyBin}
    end.

%% @private
priv_encode_docs(Docs) when is_list(Docs) ->
    priv_encode_docs(<<>>, Docs);
priv_encode_docs(Doc) ->
    bson_binary:put_document(Doc).

%% @private
priv_encode_docs(Bin, []) ->
    Bin;
priv_encode_docs(AccBin, [Doc | TailDocs]) ->
    DocBin = bson_binary:put_document(Doc),
    priv_encode_docs(<<AccBin/binary, DocBin/binary>>, TailDocs).

%% @private
priv_decode_docs(0, Bin, Docs) ->
    {lists:reverse(Docs), Bin};
priv_decode_docs(NumDocs, Bin, Docs) ->
    {Doc, RestBin} = bson_binary:get_map(Bin),
    priv_decode_docs(NumDocs - 1, RestBin, [Doc | Docs]).

%% @private
priv_encode_opmsg(OpMsgRec) when OpMsgRec#mongo_opmsg.payload_type =:= 0 ->
    #mongo_opmsg{
        request_id = RequestId,
        document = Doc
    } = OpMsgRec,
    DocBin = bson_binary:put_document(Doc),
    OpMsg = <<
%%  messageLength                 %% messageLength set later
        RequestId:32/signed-little,       %% request Id
        0:32/signed-little,           %% response To
        ?MONGO_OP_MSG:32/signed-little, %% opCode
        0:32/unsigned-little,         %% flagBits set to 0
        0:8, DocBin/binary %% section
    >>,
    MsgLength = (byte_size(OpMsg)+4),
    <<MsgLength:32/signed-little, OpMsg/binary>>.

%% mongodb 5.0之后废弃
%% @private
priv_encode_opupdate(OpUpdateRec) ->
    #mongo_opupdate{
        request_id = RequestId,
        zero = ZeroFlag,
        database = Db,
        collection = Collection,
        flags_upsert = UpdateInsert,
        flags_multiupdate = MultiUpdate,
        selector = SelectorDoc,
        updater = UpdateDoc
    } = OpUpdateRec,
    OpUpdate = <<
%%  messageLength                 %% messageLength set later
        RequestId:32/signed-little,       %% request Id
        0:32/signed-little,           %% response To
        ?MONGO_OP_UPDATE:32/signed-little, %% opCode
        ZeroFlag:32/unsigned-little,         %% flagBits set to 0
        (bson_binary:put_cstring(mc_utils:dbcoll(Db, Collection)))/binary,
        ?put_bits32(0, 0, 0, 0, 0, 0, bit(MultiUpdate), bit(UpdateInsert)),
        (bson_binary:put_document(SelectorDoc))/binary,
        (bson_binary:put_document(UpdateDoc))/binary
    >>,
    MsgLength = (byte_size(OpUpdate)+4),
    <<MsgLength:32/signed-little, OpUpdate/binary>>.

%% mongodb 5.0之后废弃
%% @private
priv_encode_opinsert(OpInsertRec) ->
    #mongo_opinsert{
        request_id = RequestId,
        flags_continue_on_error = FlagsContinueOnError,
        database = Db,
        collection = Collection,
        documents = InsertDocs
    } = OpInsertRec,
    OpInsert = <<
%%  messageLength                 %% messageLength set later
        RequestId:32/signed-little,       %% request Id
        0:32/signed-little,           %% response To
        ?MONGO_OP_INSERT:32/signed-little, %% opCode
        ?put_bits32(0, 0, 0, 0, 0, 0, 0, bit(FlagsContinueOnError)),         %% flagBits set to 0
        (bson_binary:put_cstring(mc_utils:dbcoll(Db, Collection)))/binary,
        << <<(bson_binary:put_document(Doc))/binary>> || Doc <- InsertDocs>>/binary
    >>,
    MsgLength = (byte_size(OpInsert)+4),
    <<MsgLength:32/signed-little, OpInsert/binary>>.

encode_opquery(OpQueryRec) ->
    #mongo_opquery{
        request_id = RequestId,
        flags_tailable_cursor = FlagsTailableCursor,
        flags_slave_ok = FlagsSlaveOk,
        flags_oplog_replay = FlagsOplogReplay,
        flags_no_cursor_timeout = FlagsNoCursorTimeout,
        flags_await_data = FlagsAwaitData,
        flags_exhaust = FlagsExhaust,
        flags_part_read = FlagsPartRead,
        database = Db,
        collection = Collection,
        number_to_skip = NumberToSkip,
        number_to_return = NumberToReturn,
        selector = Selector,
        return_fields_selector = ReturnFieldsSelector
    } = OpQueryRec,
    OpQuery0 = <<
%%  messageLength                 %% messageLength set later
        RequestId:32/signed-little,       %% request Id
        0:32/signed-little,           %% response To
        ?MONGO_OP_QUERY:32/signed-little, %% opCode
        ?put_bits32(bit(FlagsPartRead), bit(FlagsExhaust), bit(FlagsAwaitData), bit(FlagsNoCursorTimeout), bit(FlagsOplogReplay), bit(FlagsSlaveOk), bit(FlagsTailableCursor), 0),         %% flagBits set to 0
        (bson_binary:put_cstring(mc_utils:dbcoll(Db, Collection)))/binary,
        NumberToSkip:32/signed-little,
        NumberToReturn:32/signed-little,
        (bson_binary:put_document(Selector))/binary
    >>,
    OpQuery =
        if
            ReturnFieldsSelector =/= undefined -> << OpQuery0/binary, (bson_binary:put_document(ReturnFieldsSelector))/binary >>;
            true -> OpQuery0
        end,
    MsgLength = (byte_size(OpQuery)+4),
    <<MsgLength:32/signed-little, OpQuery/binary>>.

priv_encode_opgetmore(OpGetMoreRec) ->
    #mongo_opgetmore{
        request_id = RequestId,
        zero = ZeroFlag,
        database = Db,
        collection = Collection,
        number_to_return = NumberToReturn,
        cursor_id = CursorId
    } = OpGetMoreRec,
    OpGetMore = <<
%%  messageLength                 %% messageLength set later
        RequestId:32/signed-little,       %% request Id
        0:32/signed-little,           %% response To
        ?MONGO_OP_GET_MORE:32/signed-little, %% opCode
        ZeroFlag:32/signed-little,         %% flagBits set to 0
        (bson_binary:put_cstring(mc_utils:dbcoll(Db, Collection)))/binary,
        NumberToReturn:32/signed-little,
        CursorId:64/signed-little
    >>,
    MsgLength = (byte_size(OpGetMore)+4),
    <<MsgLength:32/signed-little, OpGetMore/binary>>.

priv_encode_opdelete(OpDeleteRec) ->
    #mongo_opdelete{
        request_id = RequestId,
        zero = ZeroFlag,
        database = Db,
        collection = Collection,
        flags_single_remove = FlagsSingleRemove,
        selector = Selector
    } = OpDeleteRec,
    OpDelete = <<
%%  messageLength                 %% messageLength set later
        RequestId:32/signed-little,       %% request Id
        0:32/signed-little,           %% response To
        ?MONGO_OP_DELETE:32/signed-little, %% opCode
        ZeroFlag:32/signed-little,         %% flagBits set to 0
        (bson_binary:put_cstring(mc_utils:dbcoll(Db, Collection)))/binary,
        ?put_bits32(0, 0, 0, 0, 0, 0, 0, bit(FlagsSingleRemove)),
        (bson_binary:put_document(Selector))/binary
    >>,
    MsgLength = (byte_size(OpDelete)+4),
    <<MsgLength:32/signed-little, OpDelete/binary>>.

priv_encode_opkillcursors(OpKillCursorsRec) ->
    #mongo_opkillcursors{
        request_id = RequestId,
        zero = ZeroFlag,
        number_of_cursor_ids = NumberOfCursorIds,
        cursor_ids = CursorIds
    } = OpKillCursorsRec,
    OpKillCursors = <<
%%  messageLength                 %% messageLength set later
        RequestId:32/signed-little,       %% request Id
        0:32/signed-little,           %% response To
        ?MONGO_OP_KILL_CURSORS:32/signed-little, %% opCode
        ZeroFlag:32/signed-little,         %% flagBits set to 0
        NumberOfCursorIds:32/signed-little,
        << <<CursorId:64/signed-little>> || CursorId <- CursorIds>>/binary
    >>,
    MsgLength = (byte_size(OpKillCursors)+4),
    <<MsgLength:32/signed-little, OpKillCursors/binary>>.

%% @private
bit(false) -> 0;
bit(true) -> 1.

%% @private
bool(0) -> false;
bool(1) -> true.